/*
 *
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

package org.apache.flink.connector.jdbc.table;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.factories.FactoryUtil;

import java.time.Duration;

/** Options for the JDBC connector. */
@PublicEvolving
public class JdbcConnectorOptions {

    public static final ConfigOption<String> URL =
            ConfigOptions.key("url").stringType().noDefaultValue().withDescription("The JDBC database URL.");

    public static final ConfigOption<String> TABLE_NAME =
            ConfigOptions.key("table-name").stringType().noDefaultValue().withDescription("The JDBC table name.");

    public static final ConfigOption<String> USERNAME =
            ConfigOptions.key("username").stringType().noDefaultValue().withDescription("The JDBC user name.");

    public static final ConfigOption<String> PASSWORD =
            ConfigOptions.key("password").stringType().noDefaultValue().withDescription("The JDBC password.");

    public static final ConfigOption<String> DRIVER = ConfigOptions.key("driver")
            .stringType()
            .noDefaultValue()
            .withDescription("The class name of the JDBC driver to use to connect to this URL. "
                    + "If not set, it will automatically be derived from the URL.");

    public static final ConfigOption<Duration> MAX_RETRY_TIMEOUT = ConfigOptions.key("connection.max-retry-timeout")
            .durationType()
            .defaultValue(Duration.ofSeconds(60))
            .withDescription("Maximum timeout between retries.");

    public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

    // -----------------------------------------------------------------------------------------
    // Scan options
    // -----------------------------------------------------------------------------------------

    public static final ConfigOption<String> SCAN_PARTITION_COLUMN = ConfigOptions.key("scan.partition.column")
            .stringType()
            .noDefaultValue()
            .withDescription("The column name used for partitioning the input.");

    public static final ConfigOption<Integer> SCAN_PARTITION_NUM = ConfigOptions.key("scan.partition.num")
            .intType()
            .noDefaultValue()
            .withDescription("The number of partitions.");

    public static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND = ConfigOptions.key("scan.partition.lower-bound")
            .longType()
            .noDefaultValue()
            .withDescription("The smallest value of the first partition.");

    public static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND = ConfigOptions.key("scan.partition.upper-bound")
            .longType()
            .noDefaultValue()
            .withDescription("The largest value of the last partition.");

    public static final ConfigOption<Boolean> SCAN_PARTITION_BY_DATETIME = ConfigOptions.key(
                    "scan.partition.by-datetime")
            .booleanType()
            .defaultValue(false)
            .withDescription("partition by datetime.");

    public static final ConfigOption<Integer> SCAN_FETCH_SIZE = ConfigOptions.key("scan.fetch-size")
            .intType()
            .defaultValue(0)
            .withDescription("Gives the reader a hint as to the number of rows that should be"
                    + " fetched from the database per round-trip when reading. If the"
                    + " value is zero, this hint is ignored.");

    public static final ConfigOption<Boolean> SCAN_AUTO_COMMIT = ConfigOptions.key("scan.auto-commit")
            .booleanType()
            .defaultValue(true)
            .withDescription("Sets whether the driver is in auto-commit mode.");

    // -----------------------------------------------------------------------------------------
    // Lookup options
    // -----------------------------------------------------------------------------------------

    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows")
            .longType()
            .defaultValue(-1L)
            .withDescription("The max number of rows of lookup cache, over this value, the oldest"
                    + " rows will be eliminated. \"cache.max-rows\" and \"cache.ttl\""
                    + " options must all be specified if any of them is specified.");

    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions.key("lookup.cache.ttl")
            .durationType()
            .defaultValue(Duration.ofSeconds(10))
            .withDescription("The cache time to live.");

    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries")
            .intType()
            .defaultValue(3)
            .withDescription("The max retry times if lookup database failed.");

    // write config options
    public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions.key(
                    "sink.buffer-flush.max-rows")
            .intType()
            .defaultValue(100)
            .withDescription("The flush max size (includes all append, upsert and delete records),"
                    + " over this number of records, will flush data.");

    public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key(
                    "sink.buffer-flush.interval")
            .durationType()
            .defaultValue(Duration.ofSeconds(1))
            .withDescription("The flush interval mills, over this time, asynchronous threads will" + " flush data.");

    public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries")
            .intType()
            .defaultValue(3)
            .withDescription("The max retry times if writing records to database failed.");

    public static final ConfigOption<String> DATA_FILTER = ConfigOptions.key("data.filter")
            .stringType()
            .noDefaultValue()
            .withDescription("the basic filter(lookup or scan), auto added after where.");

    private JdbcConnectorOptions() {}
}
